{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting Spark application\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<table>\n",
       "<tr><th>ID</th><th>YARN Application ID</th><th>Kind</th><th>State</th><th>Spark UI</th><th>Driver log</th><th>Current session?</th></tr><tr><td>8</td><td>application_1561468620886_0010</td><td>pyspark</td><td>idle</td><td><a target=\"_blank\" href=\"http://hopsworks0.logicalclocks.com:8088/proxy/application_1561468620886_0010/\">Link</a></td><td><a target=\"_blank\" href=\"http://hopsworks0.logicalclocks.com:8042/node/containerlogs/container_e01_1561468620886_0010_01_000001/demo_deep_learning_admin000__meb10000\">Link</a></td><td>✔</td></tr></table>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "SparkSession available as 'spark'.\n"
     ]
    }
   ],
   "source": [
    "import numpy as np\n",
    "import random\n",
    "import pandas as pd\n",
    "from pyspark.sql import Row\n",
    "from hops import featurestore"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "area_ids = list(range(1,51))\n",
    "house_purchased_amounts = []\n",
    "house_purchases_bidders = []\n",
    "house_purchases_area_ids = []\n",
    "for i in area_ids:\n",
    "    for j in list(range(1,1000)):\n",
    "        house_purchased_amounts.append(abs(np.random.exponential()*100000)/i)\n",
    "        house_purchases_bidders.append(int(abs(np.random.exponential()*10)/i))\n",
    "        house_purchases_area_ids.append(i)\n",
    "house_purchase_ids = list(range(len(house_purchases_bidders)))\n",
    "houses_sold_data  = pd.DataFrame({\n",
    "        'area_id':house_purchases_area_ids,\n",
    "        'house_purchase_id':house_purchase_ids,\n",
    "        'number_of_bidders': house_purchases_bidders,\n",
    "        'sold_for_amount': house_purchased_amounts\n",
    "    })\n",
    "houses_sold_data_spark_df = sqlContext.createDataFrame(houses_sold_data)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-----------------+-----------------+------------------+\n",
      "|area_id|house_purchase_id|number_of_bidders|   sold_for_amount|\n",
      "+-------+-----------------+-----------------+------------------+\n",
      "|      1|                0|                0|22840.136793738166|\n",
      "|      1|                1|                8| 78309.64488345955|\n",
      "|      1|                2|                3|16265.972879369778|\n",
      "|      1|                3|               11|17829.282600311155|\n",
      "|      1|                4|                8| 36971.39422706205|\n",
      "+-------+-----------------+-----------------+------------------+\n",
      "only showing top 5 rows"
     ]
    }
   ],
   "source": [
    "houses_sold_data_spark_df.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- area_id: long (nullable = true)\n",
      " |-- house_purchase_id: long (nullable = true)\n",
      " |-- number_of_bidders: long (nullable = true)\n",
      " |-- sold_for_amount: double (nullable = true)"
     ]
    }
   ],
   "source": [
    "houses_sold_data_spark_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "sum_houses_sold_df = houses_sold_data_spark_df.groupBy(\"area_id\").sum()\n",
    "count_houses_sold_df = houses_sold_data_spark_df.groupBy(\"area_id\").count()\n",
    "sum_count_houses_sold_df = sum_houses_sold_df.join(count_houses_sold_df, \"area_id\")\n",
    "sum_count_houses_sold_df = sum_count_houses_sold_df \\\n",
    "    .withColumnRenamed(\"sum(number_of_bidders)\", \"sum_number_of_bidders\") \\\n",
    "    .withColumnRenamed(\"sum(sold_for_amount)\", \"sum_sold_for_amount\") \\\n",
    "    .withColumnRenamed(\"count\", \"num_rows\")\n",
    "def compute_average_features_houses_sold(row):\n",
    "    avg_num_bidders = row.sum_number_of_bidders/float(row.num_rows)\n",
    "    avg_sold_for = row.sum_sold_for_amount/float(row.num_rows)\n",
    "    return Row(\n",
    "        sum_number_of_bidders=row.sum_number_of_bidders, \n",
    "        sum_sold_for_amount=row.sum_sold_for_amount,\n",
    "        area_id = row.area_id,\n",
    "        avg_num_bidders = avg_num_bidders,\n",
    "        avg_sold_for = avg_sold_for\n",
    "       )\n",
    "houses_sold_features_df = sum_count_houses_sold_df.rdd.map(\n",
    "    lambda row: compute_average_features_houses_sold(row)\n",
    ").toDF()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+--------------------+------------------+---------------------+--------------------+\n",
      "|area_id|     avg_num_bidders|      avg_sold_for|sum_number_of_bidders| sum_sold_for_amount|\n",
      "+-------+--------------------+------------------+---------------------+--------------------+\n",
      "|     26| 0.07707707707707707|4012.4700950228616|                   77|   4008457.624927839|\n",
      "|     29|0.062062062062062065| 3472.587671736636|                   62|  3469115.0840648995|\n",
      "|     19|  0.1871871871871872| 5143.836954980383|                  187|   5138693.118025403|\n",
      "|     22| 0.13113113113113112| 4310.862103491131|                  131|    4306551.24138764|\n",
      "|      7|   0.986986986986987|13948.184284438423|                  986|1.3934236100153985E7|\n",
      "+-------+--------------------+------------------+---------------------+--------------------+\n",
      "only showing top 5 rows"
     ]
    }
   ],
   "source": [
    "houses_sold_features_df.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- area_id: long (nullable = true)\n",
      " |-- avg_num_bidders: double (nullable = true)\n",
      " |-- avg_sold_for: double (nullable = true)\n",
      " |-- sum_number_of_bidders: long (nullable = true)\n",
      " |-- sum_sold_for_amount: double (nullable = true)"
     ]
    }
   ],
   "source": [
    "houses_sold_features_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_deep_learning_admin000_featurestore\n",
      "Feature group created successfully"
     ]
    }
   ],
   "source": [
    "featurestore.create_featuregroup(\n",
    "    houses_sold_features_df,\n",
    "    \"houses_sold_featuregroup\",\n",
    "    description=\"aggregate features of sold houses per area\",\n",
    "    descriptive_statistics=False,\n",
    "    feature_correlation=False,\n",
    "    feature_histograms=False,\n",
    "    cluster_analysis=False\n",
    ")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
